Add quickwit-datafusion crate#6270
Add quickwit-datafusion crate#6270alexanderbianchi wants to merge 1 commit intoquickwit-oss:mainfrom
Conversation
a7618bc to
f8c18a9
Compare
f8c18a9 to
80da189
Compare
|
Asked Claude to give me a strategy on how to review. PR #6270 Review Plan: Add quickwit-datafusion crateScopeThe local branch has the parent PR #6237 already merged to main. The actual PR changes are the uncommitted modifications (10 existing files) plus all untracked files (~5300 lines in the new crate, integration tests, proto definitions). Total: ~7100 additions. Review Order (dependency-driven, bottom-up)Phase 1: Protocol & Interface ContractReview the proto definition and generated code first — everything else depends on this API surface.
Phase 2: Core Abstractions (the generic execution layer)These files form the extension-point architecture. Review for soundness and future-proofing.
Phase 3: Distributed ExecutionCritical path — correctness bugs here affect query results.
Phase 4: Metrics Data Source (the concrete implementation)The bulk of domain logic lives here.
Phase 5: Service Layer & gRPC IntegrationWhere the crate meets the outside world.
Phase 6: Integration TestsValidate end-to-end correctness claims.
Phase 7: Cross-Cutting Concerns
Key Risk Areas to Prioritize
|
| // throughput. Ingesters (routers) do so to update their shard table. | ||
| let local_shards_update_listener_handle_opt = if node_config | ||
| .is_service_enabled(QuickwitService::ControlPlane) | ||
| || node_config.is_service_enabled(QuickwitService::Indexer) |
| None | ||
| }; | ||
| // Mount gRPC OpenTelemetry OTLP services if present. | ||
| let otlp_metrics_grpc_service = |
There was a problem hiding this comment.
was this enabled just for your local testing? we probably don't need this (yet)
| pub struct MetricsSplitQuery { | ||
| pub metric_names: Option<Vec<String>>, | ||
| pub time_range_start: Option<u64>, | ||
| pub time_range_end: Option<u64>, |
There was a problem hiding this comment.
i checked what we're actually doing during ingestion, the only things being populated and can be used are metric_names, time_range_start, time_range_end, and tag_service, everything else will have empty arrays 😅
but i believe these tag_* fields in the metastore are going to go away pretty soon with @g-talbot changes. they'll be replaced with a zonemap_regex map similar to what husky does - it basically acts as a bloom filter, telling us if a value for a tag definitely does not exist in the file, or maybe does. we'll need custom logic to evaluate the zonemap regex again the query predicates, which im sure exists somewhere
so postgres gives us the files that contain metric X and are within a given time frame, then in "application code" we use the predicates + zone maps to prune files we don't care about.
There was a problem hiding this comment.
im fine with keeping this stuff as is, we can come back to this once we're actually writing + compacting files with the zonemaps. if it's worth it, i can fix what we have to actually populate the other fields, until the zonemap stuff is ready.
Overview
Adds
quickwit-datafusion— a DataFusion-based query execution layer for parquet metrics splits, built on top of PR #6237 (wide-schema parquet pipeline).What's in this PR
quickwit-datafusioncrate — self-contained query layer with a pluggableQuickwitDataSourcetrait,DataFusionSessionBuilder,QuickwitSchemaProvider, andDataFusionService(streaming gRPC:ExecuteSql+ExecuteSubstrait)MetricsDataSource— parquet metrics source: metastore-backed split discovery, 30s object-store cache, filter pushdown with CAST-unwrapping fix, SubstraitReadRelconsumptionDistributedPhysicalOptimizerRuledecomposes queries into tasks (one per split) viaPartitionIsolatorExec. Tasks, not shuffles — parquet splits are self-contained so no cross-worker repartitioning is neededDataFusionServiceonly starts whenQW_ENABLE_DATAFUSION_ENDPOINT=trueis set; zero impact on existing deploymentsExecution Flow
flowchart TD Client -->|SQL or Substrait bytes| SVC[DataFusionService\nExecuteSql / ExecuteSubstrait] subgraph Coordinator SVC --> SB[DataFusionSessionBuilder\nbuild_session] SB --> CTX[SessionContext\n+ QuickwitSchemaProvider] CTX -->|Substrait| SC[QuickwitSubstraitConsumer\nconsume_read ReadRel] CTX -->|SQL| SP[QuickwitSchemaProvider\ntable lookup] SC --> MD[MetricsDataSource] SP --> MD MD --> IR[MetastoreIndexResolver\nresolve index_name] IR -->|split_provider · object_store · url\n30 s object-store cache| TP[MetricsTableProvider] TP --> OPT[DataFusion Optimizer] OPT -->|SearcherPool ≥ 2 nodes| DIST[DistributedPhysicalOptimizerRule] DIST --> EST[QuickwitTaskEstimator\nDesired N = num splits] EST --> PIE[PartitionIsolatorExec\nt0 gets splits 0,1 · t1 gets splits 2,3 …] PIE -->|WorkerService gRPC| WORKERS[(Workers)] end subgraph Worker [Worker — runs per task] WORKERS --> SCAN[MetricsTableProvider::scan\npushed-down filters] SCAN --> EXT[extract_split_filters\nMetricsSplitQuery\nCAST-unwrapping fix] EXT --> MSP[MetastoreSplitProvider\nlist_splits → metastore RPC] MSP -->|published splits matching\nmetric_names · time_range · tags| PS[ParquetSource\nbloom filter · page index · pushdown] PS --> OBS[QuickwitObjectStore\nquickwit_storage::Storage bridge] end OBS --> AGG[Partial aggregates] AGG --> NCE[NetworkCoalesceExec\ncoordinator merges] NCE -->|Arrow IPC stream| ClientNotes
SearcherPoolhas only one node the distributed rule is a no-op — the plan runs locally as a standardDataSourceExec.PartitionIsolatorExecassigns each split to a specific worker task; workers execute their local parquet scans and return partial aggregates. NoNetworkShuffleExec(no cross-worker repartitioning) because splits are already self-contained.QuickwitObjectStoreis a read-only adapter:get_opts,get_range,headare implemented; all write/list operations returnNotSupported.